-
Notifications
You must be signed in to change notification settings - Fork 13
Add an Akka Stream Sink graph stage for Kinesis. #47
Conversation
1 similar comment
…h -> KinesisSinkGraphStage
I haven't forgotten about this it's just been a crazy week! |
@markglh I totally understand. Looking forward to see your review. |
@aquamatthias I'll get this reviewed today. In answer to your questions:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work again, thanks @aquamatthias !
Just a few comments / questions
README.md
Outdated
### Akka Stream Sink | ||
|
||
An Akka `Sink` is provided which can be used to publish messages via streams. | ||
Every message is send as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny grammar correction: Every message is sent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grammar my old enemy - thanks for correction.
README.md
Outdated
Every message is send as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload. | ||
The `Sink` is created from a `ProducerConf` or directly with a `KinesisProducerActor`. See [Kinesis](https://github.com/WW-Digital/reactive-kinesis/blob/master/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala) for the various options. | ||
|
||
The `Sink` expects an acknowledgement for every messages send to Kinesis. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expects an acknowledgement for every message sent to Kinesis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
README.md
Outdated
|
||
The `Sink` expects an acknowledgement for every messages send to Kinesis. | ||
An amount of unacknowledged messages can be configured, before back pressure is applied. | ||
See the throttling conf for defining this configuration value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be clearer as:
This throttling is controlled by the kinesis.{producer}.akka.max-outstanding-requests
configuration value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed much better.
README.md
Outdated
Please note: a default value (1000 messages) is applied, if throttling is not configured. | ||
|
||
The provided `Sink` produces a `Future[Done]` as materialized value. | ||
This future succeeds, if all messages from upstream are send to Kinesis and acknowledged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just swap out send
for sent
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
// Holds all outstanding messages by its message identifier. | ||
// Note: the identifier is created here locally and does not make sense outside of this stage. | ||
val outstandingMessages: mutable.AnyRefMap[String, ProducerEvent] = mutable.AnyRefMap.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of questions about this:
- If there's a break in sending messages (quiet period), will it complete and stop? As in, when
outstandingMessages
is empty. - Do we need to store the outstanding messages, or could we instead defer to
producer.outstandingRecordsCount()
like the underlying actor does? I'm thinking about memory consumption here, albeit a small amount. - When we receive a
SendFailed
- is failing the stage the right thing to do? As in, should we instead retry? Happy to defer this logic to another PR if it's worthwhile, in on the fence right now. Hard to know how often this will happen in real world usage. If often, then it's worth it. - Regarding the earlier discussion about re-using the
maxOutstandingRequests
config. Am I correct in saying the following:- We'll stop requesting more messages when this count is hit (e.g. 1000 messages), which in theory should mean we won't actually throttle. So the fact that throttling is enabled won't matter because we won't start buffering messages within the actor.
- The reason I implemented this was to reduce the number of futures in play (thereby saving memory and reducing work within the KPL). But it can potentially add latency depending on the value of
throttling-retry-millis
- so my question here is, do we ever want this to happen here, if not then it does actually make sense to use a different property - which goes against my earlier answer :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Q: If there's a break in sending messages (quiet period), will it complete and stop? As in, when outstandingMessages is empty.
- A: No. The stream finishes only, if upstream finishes or the stream fails. See the SourceQueue Example added to the Readme : all messages send to the
SourceQueue
get published to Kinesis.
- A: No. The stream finishes only, if upstream finishes or the stream fails. See the SourceQueue Example added to the Readme : all messages send to the
-
Q: Do we need to store the outstanding messages, or could we instead defer to producer.outstandingRecordsCount() like the underlying actor does? I'm thinking about memory consumption here, albeit a small amount.
- A: The graph stage runs async with the actor and the producer - the decision based on outstandingRecordsCount of the producer might be wrong. Since there is also no need to store a reference to all outstanding messages, I reworked my solution to only use an internal counter (and removed the map). This should be correct and has no memory impact.
-
Q: When we receive a SendFailed - is failing the stage the right thing to do? As in, should we instead retry? Happy to defer this logic to another PR if it's worthwhile, in on the fence right now. Hard to know how often this will happen in real world usage. If often, then it's worth it.
- A: Correct - the stage would fail in this case. I think adding a retry logic totally makes sense. I think this logic should be implemented in
KinesisProducerActor
as a separate PR. I created KinesisProducerActor: failed sends should be retried #50 to reflect this.
- A: Correct - the stage would fail in this case. I think adding a retry logic totally makes sense. I think this logic should be implemented in
-
Q: We'll stop requesting more messages when this count is hit (e.g. 1000 messages), which in theory should mean we won't actually throttle. So the fact that throttling is enabled won't matter because we won't start buffering messages within the actor.
- A: akka streams are reactive: events are send downstream, demand is send upstream. The sink signals demand until
maxOutstandingMessages
are outstanding. If this number is reached no demand is signalled upstream. This also means that no more messages flow through the stream. The source of the stream will not emit more messages until demand is signalled.
- A: akka streams are reactive: events are send downstream, demand is send upstream. The sink signals demand until
-
Q: The reason I implemented this was to reduce the number of futures in play (thereby saving memory and reducing work within the KPL). But it can potentially add latency depending on the value of throttling-retry-millis - so my question here is, do we ever want this to happen here, if not then it does actually make sense to use a different property - which goes against my earlier answer
- A: If the source is faster than the sink, the number of outstanding messages will grow over time which will result in an application failure. IMHO it is necessary to define a maximum number of outstanding messages and to react (e.g. upstream should not produce more messages until demand is signalled) to prevent this. This is what akka streams provide out of the box. And this is also the reason to use a sensible throttling default. Since the number can be configured and can get arbitrary large, I don't think we will suffer from latency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 thanks!
import scala.concurrent.duration._ | ||
import scala.concurrent.{Await, Future} | ||
|
||
class KinesisSinkGraphStageSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good coverage of scenarios here!
One comment, some aspects might be simpler using the streams testkit - and it may avoid having to resolve the futureValue: https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-testkit.html#streams-testkit
For example using the TestSource
to assert exceptions / completion and perhaps introducer breaks in the flow of messages more easily (to allow assertions between messages).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestKit is great. The materialized Future
produced by the sink is part of the GraphStage, that I wanted to test as well. If you see a test case not covered, that you would like to see - please let me know.
🎉 🎉 🎉 |
Since there is now a
Source
as Kinesis Consumer, aSink
is needed to act as Kinesis Producer.This PR adds a
KinesisSinkGraphStage
that uses an underlyingKinesisProducerActor
to do the heavy lifting. The stream manages back pressure by allowing only a fixed number of outstanding messages. A materialized value is used to indicate, when a stream has been finished (or failed).A unit as well as integration test is in place.
Open topics: